package day02.transform;

import beans.SensorReading;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Flink 流处理 API - 滚动聚合算子
 *
 * @author lvbingbing
 * @date 2021-12-09 22:15
 */
public class FlinkTransform02 {
    public static void main(String[] args) throws Exception {
        // 1、创建FlinkTransform00对象，有参构造会初始化 env，并从文件中读取数据
        int parallelism = 4;
        FlinkTransform00 flinkTransform = new FlinkTransform00(parallelism);
        // 2、获取执行环境
        StreamExecutionEnvironment env = flinkTransform.getEnv();
        // 3、学习 KeyBy 和 滚动聚合算子
        studyKeyByAndRollingAggregate(flinkTransform.getSensorReadingStream());
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 学习 KeyBy 和 滚动聚合算子
     * <p>
     * 1、KeyBy()：DataStream → KeyedStream：逻辑地将一个流拆分成不相交的分区，每个分区包含具有相同 key 的元素，在内部以 hash 的形式实现的。<br>
     * Tips：keyBy() 类似于 MySQL 中的 group by。先分组，再聚合
     * <p>
     * 2、滚动聚合算子：针对 KeyedStream 的每一个支流做聚合。<br>
     * ⚫ sum()<br>
     * ⚫ min()<br>
     * ⚫ max()<br>
     * ⚫ minBy()<br>
     * ⚫ maxBy()<br>
     *
     * @param sensorReadingStream <br>
     */
    private static void studyKeyByAndRollingAggregate(DataStream<SensorReading> sensorReadingStream) {
        // 分组
        KeyedStream<SensorReading, String> keyedStream = sensorReadingStream.keyBy(SensorReading::getId);
        KeyedStream<SensorReading, Tuple> keyedStream1 = sensorReadingStream.keyBy("id");
        // 滚动聚合，每一个分组取最大的温度值。
        // 滚动时只会更新最大温度值，不会更新其他字段的值
        DataStream<SensorReading> resultStream = keyedStream.max("temperature");
        // resultStream.print("result");
        // 滚动聚合，每一个分组取最大的温度值对应的那条记录
        DataStream<SensorReading> resultStream1 = keyedStream1.maxBy("temperature");
        resultStream1.print("result1");
    }
}